-
Notifications
You must be signed in to change notification settings - Fork 39
Feature Implementation: AWS Glue Job Execution Support #308
Conversation
Would love to use this! |
prefect_aws/glue_job.py
Outdated
aws_secret_access_key="your_secret_access_key" | ||
) | ||
glue_job = GlueJob( | ||
glue_job_name="your_glue_job_name", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you are referencing glue_job_name
but I believe you are actually using job_name
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@feliperazeek
Thanks for your comment.
fixed it.
8f9e9af
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @knakazawa99! It looks like the tests are failing with a missing pyparsing
dependency. You can add pyparsing
to requirements.txt
to ensure it gets installed with this library. Let me know when the tests are passing and I can give the change a more thorough review.
Also, we introduced a worker concept a little while ago that can be used in place of infrastructure blocks. If you're willing, it'd be great if you could also create an AWS Glue worker! It shouldn't be too tough since you've already created an infrastructure block. We have a guide on creating a worker here which will be helpful if you decide to create an AWS Glue worker.
@desertaxle |
This fix sounds good to me! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a few high-level comments! I'll do another pass once the tests are passing!
arguments: Optional[dict] = Field( | ||
default=None, | ||
title="AWS Glue Job Arguments", | ||
description="The job arguments associated with this run.", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like how flexible this field is, but it'd be nice to have some stronger typing while maintaining flexibility. Introducing a variables class that contains some values that will often vary between deployments and creating a default template can help with that. You can checkout the ECS worker's variables class as an example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fix it.
Does this match what you had in mind?
e71b947
job_name: str = Field( | ||
..., | ||
title="AWS Glue Job Name", | ||
description="The name of the job definition to use.", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BaseJobConfiguration
class has a name
attribute that automatically gets populated with the flow name. I think it'd be worthwhile to use that attribute rather than introducing another name attribute.
job_name: str = Field( | |
..., | |
title="AWS Glue Job Name", | |
description="The name of the job definition to use.", | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
job_name
is necessarily to use start glue job.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/client/start_job_run.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, does this require a Glue job to exist already? Usually, workers dynamically create jobs to execute flow runs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am user very interested in using this feature, I actually I have forked this into my repo already. I think it's expected that the glue job exists already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@feliperazeek are you using the worker to run Glue jobs or the block in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable name is good and will work, but I think the implementation of the worker needs to change to operate the same as our other workers. I'd expect a Prefect Glue worker to create a Glue job for each flow run that it picks up and then monitor that Glue job for completion.
@knakazawa99 I think we can take this PR in one of two directions:
- Update the worker and block to create new Glue jobs for executing flow runs
- Remove the worker and update the block to implement the
JobBlock
interface instead of theInfrastructure
interface. This will make it more suited for executing existing Glue jobs (which I'm realizing might have been the original intent of the PR, which I misunderstood).
Let me know which direction you'd like to go with this PR, and I will do whatever I can to help. Thank you for your patience and sticking with this PR!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@feliperazeek are you using the worker to run Glue jobs or the block in this PR?
Running glue jobs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@desertaxle
Is the following direction correct?
- delete Worker
- modify the part that inherits Infrastructure to match the Interface of JobBlock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for the delay. Yes, that is the direction that I'm suggesting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@desertaxle
I have fixed it. However, the test is failing due to the boto3. I have not been able to reproduce this error in my local environment.
Hey @knakazawa99, apologies for the delay. I pushed a change that fixes the error you saw in the test. I'll give this one more review today so we can merge it as soon as possible. |
prefect_aws/glue_job.py
Outdated
async def trigger(self) -> GlueJobRun: | ||
"""trigger for GlueJobRun""" | ||
return GlueJobRun( | ||
job_name=self.job_name, | ||
arguments=self.arguments, | ||
job_watch_poll_interval=self.job_watch_poll_interval, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should start the job run here and then return the GlueJobRun
object. The user can choose to wait for the job, or not, but the job should run whether or not they wait for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your comment.
I misunderstood the responsibilities of Glue Job Run.
It has been corrected.
prefect_aws/glue_job.py
Outdated
client: Any = Field(default=None, description="") | ||
job_id: str = Field( | ||
default="", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These look like they could be private fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Glue Job Run responsibilities have changed and require public access.
from prefect_aws import AwsCredentials | ||
|
||
|
||
class GlueJobRun(JobRun, BaseModel): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class doesn't need to be a Pydantic BaseModel since we don't need validation. I think it might be simpler to make it a plain Python class
class GlueJobRun(JobRun, BaseModel): | |
class GlueJobRun(JobRun): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it useful to depend on pydantic for uniformity with other object definitions, validation, and attribute descriptions.
@desertaxle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks so much for your patience and perseverance @knakazawa99!
This pull request implements the ability to execute AWS Glue Jobs directly from Prefect. This feature was developed based on the existing ECS Task program and is intended to provide a powerful preprocessor for data used in aggregate processing and machine learning models.
Closes #307
Example
Screenshots
Blocks Catalog
Examples Catalog
API Reference
Checklist
pre-commit
checks.pre-commit install && pre-commit run --all
locally for formatting and linting.mkdocs serve
view documentation locally.